Skip to content

feat: add workflow stage resume#747

Open
andreatgretel wants to merge 5 commits into
mainfrom
andreatgretel/feat/stage-level-resume
Open

feat: add workflow stage resume#747
andreatgretel wants to merge 5 commits into
mainfrom
andreatgretel/feat/stage-level-resume

Conversation

@andreatgretel

@andreatgretel andreatgretel commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

📋 Summary

Adds stage-level resume support for chained workflows so compatible completed stages can be reused, matching partial stages can continue through the existing single-stage resume path, and downstream stages rerun when upstream outputs change.

🔗 Related Issue

N/A

🔄 Changes

  • Add CompositeWorkflow.run(resume=...) with completed-stage reuse and partial-stage delegation.
  • Invalidate downstream stages when an upstream stage reruns, changes, or has missing selected/callback output.
  • Harden workflow metadata writes and let ResumeMode.IF_POSSIBLE fall back to fresh runs when prior metadata is unusable.
  • Add public workflow resume tests for skip, rerun, partial/failed-stage resume, callback-output, output-processor, completed-empty, corrupt metadata, and strict resume behavior.
  • Document workflow resume behavior in MkDocs and Fern docs.
  • Update the workflow chaining plan with the completed stage-level resume slice.

🧪 Testing

  • make test passes
  • Unit tests added/updated
  • E2E tests added/updated (if applicable)

Ran:

  • .venv/bin/ruff format .
  • .venv/bin/ruff check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py
  • .venv/bin/ruff format --check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py
  • .venv/bin/pytest packages/data-designer/tests/interface/test_composite_workflow.py -q - 55 passed, 2 warnings
  • .venv/bin/pytest /home/ubuntu/Code/reviews/DataDesigner-747/smoke_test.py -q -s - 2 passed against NVIDIA Build (nvidia/nemotron-3-nano-30b-a3b) and NVIDIA Inference (openai/openai/gpt-5.4-nano) using /home/ubuntu/Code/.env

Note: full .venv/bin/ruff check --fix . currently hits an unrelated existing generated-notebook lint in docs/colab_notebooks/7-nemotron-personas.ipynb (F404).

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated (if applicable)

@andreatgretel andreatgretel marked this pull request as ready for review June 11, 2026 19:58
@andreatgretel andreatgretel requested a review from a team as a code owner June 11, 2026 19:58
@github-actions

github-actions Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

MkDocs preview: https://7d708f5e.dd-docs-preview.pages.dev

Fern preview: https://nvidia-preview-pr-747.docs.buildwithfern.com/nemo/datadesigner

Fern previews include the docs-website version archive with PR changes synced into latest. Notebook tutorials are rendered without execution outputs in previews.

@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR implements stage-level resume for CompositeWorkflow, allowing compatible completed stages to be reused across runs and partial/failed stages to continue through the existing single-stage resume path. The implementation is carefully integrated with the existing fingerprint system and adds robust atomic metadata writes and path relativisation for workflow portability.

  • Adds workflow.run(resume=ResumeMode) with fingerprint-based stage skipping (IF_POSSIBLE falls back to fresh runs on unresolvable stages, ALWAYS raises), partial-stage delegation via ResumeMode.ALWAYS to DataDesigner.create(), and force_rerun_downstream propagation that rebuilds all descendants after any stage that runs or resumes.
  • Hardens metadata writes with an atomic os.replace() + fsync pattern and stores paths relative to workflow_path so artifacts can be relocated; previously written absolute paths are normalised on resume via _normalize_stage_path_metadata.
  • Adds 11 public tests covering skip, rerun, partial/failed delegation, callback-output invalidation, output-processor skip, moved-artifact portability, completed_empty propagation, corrupt-metadata fallback/strict, and ALWAYS-mode rejection.

Confidence Score: 5/5

Safe to merge — the resume logic, fingerprint matching, downstream invalidation, and atomic metadata writes are all correct.

The core logic traces correctly through every identified path: fingerprint-based stage reuse, force_rerun_downstream propagation, partial-stage delegation, completed_empty propagation, and error behaviour for strict-mode mismatches. The atomic write pattern and path relativisation are sound. Test coverage is comprehensive across all new code paths and edge cases.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer/src/data_designer/interface/composite_workflow.py Core resume implementation: adds prior metadata reading, fingerprint-based stage skipping, partial-stage delegation, downstream invalidation via force_rerun_downstream, atomic metadata writes, and path relativisation for portability. Logic traces correctly through all identified paths.
packages/data-designer/tests/interface/test_composite_workflow.py Adds 11 new resume tests covering skip, rerun, partial/failed-stage delegation, callback-output invalidation, output-processor skip, moved-artifact portability, empty-stage propagation, corrupt-metadata fallback/strict, and ALWAYS-mode rejection. _mark_stage_resumable correctly strips completion-only fields to match real interrupted-run metadata shape.
docs/concepts/workflow-chaining.md Adds resume section and removes "not implemented yet" bullet; docs match the implementation's IF_POSSIBLE/ALWAYS semantics.
fern/versions/latest/pages/concepts/workflow-chaining.mdx Mirrors the MkDocs resume section for Fern; identical prose and code snippet, consistent with implementation.
plans/workflow-chaining/workflow-chaining.md Updates plan status section to reflect completed stage-level resume slice; still-deferred items unchanged.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[workflow.run resume=X] --> B[Read prior workflow metadata]
    B --> C{resume=NEVER or\nno metadata?}
    C -- yes --> D[prior_metadata = None]
    C -- no --> E[prior_metadata loaded]
    D & E --> F[For each stage]

    F --> G{skipped_upstream_stage\nset?}
    G -- yes --> H[Mark stage skipped_empty_upstream\ncontinue]
    G -- no --> I[Compute stage_fingerprint]

    I --> J{prior_matches?\nnot force_rerun_downstream\nAND fingerprint match}
    J -- yes --> K{_can_skip_prior_stage?}
    K -- yes --> L[Restore from prior metadata\nset previous_seed_path / fingerprint\nif completed_empty: set skipped_upstream_stage\ncontinue]
    K -- no --> M{prior status in\nRESUMABLE_STAGE_STATUSES\nAND stage_path exists?}
    M -- yes --> N[stage_resume = ALWAYS]
    M -- no --> O{resume=ALWAYS\nAND NOT force_rerun_downstream?}
    O -- yes --> P[raise DataDesignerWorkflowError]
    O -- no --> Q[stage_resume = NEVER\ndelete stage_path if exists]
    N --> R[Run stage via DataDesigner.create\nresume=stage_resume]
    Q --> R

    J -- no --> O

    R --> S{stage has\noutput_processors?}
    S -- yes --> T[Delete output-processor dir\nRun output-processor create fresh]
    S -- no --> U[Determine output_seed_path\nvia on_success or output selection]
    T --> U
    U --> V{output_records == 0?}
    V -- yes, allow_empty --> W[status=completed_empty\nset skipped_upstream_stage]
    V -- no --> X[status=completed]
    W & X --> Y[force_rerun_downstream = True\nwrite metadata\ncontinue to next stage]
Loading

Reviews (4): Last reviewed commit: "Merge branch 'main' into andreatgretel/f..." | Re-trigger Greptile

Comment thread packages/data-designer/src/data_designer/interface/composite_workflow.py Outdated
Comment thread packages/data-designer/tests/interface/test_composite_workflow.py Outdated
@github-actions

Copy link
Copy Markdown
Contributor

Review: PR #747feat: add workflow stage resume

Summary

Adds stage-level resume to CompositeWorkflow.run(resume=...):

  • Reuses compatible completed stages (fingerprint match + output verifiable).
  • Delegates partial stages (running / failed) to the existing single-stage resume path via DataDesigner.create(..., resume=ResumeMode.ALWAYS).
  • Invalidates downstream stages on first re-run, change, or missing output.
  • Adds atomic, fsync'd workflow-metadata.json writes via tmp+os.replace.
  • Falls back to fresh runs in ResumeMode.IF_POSSIBLE when prior metadata is missing/corrupt; raises in ResumeMode.ALWAYS.
  • Tests for skip / rerun / partial-resume / callback-output / output-processor / completed-empty / corrupt metadata / strict-resume.
  • Docs added in both mkdocs and Fern; phase-3 plan updated.

Diff is well-scoped (5 files, +503/−10) and lives entirely in the interface layer — no engine/config touch, no import-direction violations.

Findings

Correctness — Medium

  1. output_seed_path is persisted and reused as an absolute path (composite_workflow.py:400, consumed at composite_workflow.py:309). If a user moves their artifact root (rsync, archive, container mount), prior_metadata still matches by name/stage_dir, but output_seed_path points at the old location while _stage_result_from_metadata builds an ArtifactStorage rooted at the new workflow_path. The result is a mixed-state read (old data, new storage object) or a _count_parquet_records raise — silently invalidating reuse in the best case, confusing behavior in the worst. Storing this as a path relative to workflow_path would be more robust. (Same applies to callback_output_path.)

  2. ResumeMode.ALWAYS semantics drift once any stage is re-run. The added doc says: "If a stage changed or its selected output is missing, the workflow raises instead of starting fresh." But once an ALWAYS resume successfully resumes a partial upstream stage from checkpoint (stage_resume == ALWAYS, line 328), force_rerun_downstream flips to True and the downstream elif resume == ResumeMode.ALWAYS and not force_rerun_downstream: guard (line 330) is bypassed — so a downstream stage whose fingerprint also differs from prior metadata silently runs fresh instead of raising. That may be the intended behavior, but it deviates from the doc and from the strict reading of the test_composite_workflow_resume_always_rejects_changed_stage contract. Consider tightening the doc and adding a test for "ALWAYS, upstream resumed-from-checkpoint, downstream fingerprint differs".

Correctness — Low

  1. _load_stage_analysis swallows every exception (composite_workflow.py:552):

    try:
        return DatasetProfilerResults.model_validate({...})
    except Exception:
        return None

    Bare except Exception masks both ValidationError (the only one you'd plausibly recover from) and unrelated bugs introduced by future schema changes. Narrow it to pydantic.ValidationError so a real bug doesn't decay into a silent analysis=None.

  2. stage_metadata.update(prior_stage_metadata) on the skip path (line 308) wholesale-imports every key the prior run wrote, including ones not produced by the current run (config, seed_path, seeded_from_stage, num_records_requested, duration_sec). That's by design here, but means the new metadata file's seeded_from_stage for a re-skipped stage may name a stage from the prior run that no longer exists in the current workflow definition. Fine for inspection; worth a note if later phases lean on those fields.

  3. _stage_result_from_metadata returns DatasetMetadata() — empty (line 528). The original run may have collected real DatasetMetadata. Reusing the cached stage exposes a stripped-down DatasetCreationResults to user code. Document or persist+rehydrate.

Style / Nits

  1. Unused import in TYPE_CHECKING removal: DatasetProfilerResults was lifted out of TYPE_CHECKING (line 18) because it's now used at module scope by _load_stage_analysis. That's correct, but DatasetProfilerResults is heavy; the project lazy-loads pandas already (lazy.pd). Confirm that data_designer.config.analysis.dataset_profiler doesn't drag in numpy/pandas at import — if it does, this regresses the import-time profile (make perf-import would catch it).

  2. tmp_path = path.with_name(f"{path.name}.tmp.{os.getpid()}") (line 706): if two threads in the same process both write workflow metadata for the same workflow (uncommon but possible during async cleanup), they collide on the temp filename. Adding a uuid suffix would be safer; PID alone is not unique within a process. Probably out of scope.

  3. force_rerun_downstream is largely redundant with the upstream-fingerprint chain (since each stage's fingerprint folds in upstream_fingerprint). Its real job is to short-circuit the ResumeMode.ALWAYS raise on line 330. A short comment to that effect would help future readers — right now the flag's necessity is non-obvious.

Tests

Coverage is strong. Suggestions only:

  • Add a test pinning the ALWAYS-after-partial-upstream-resume behavior (see finding DataDesigner.make_seed_reference_from_file doesn't support paths with multiple parquet partition #2) — whichever way the project decides on, lock it in.
  • test_composite_workflow_resume_if_possible_skips_stage_with_output_processors checks main-batch mtime; consider also asserting the output-processors directory wasn't touched (its mtime is the actual cache-hit signal for the output-processor work).
  • Worth a regression test for prior metadata whose stages length exceeds the current workflow's stage list — _get_prior_stage_metadata handles index >= len(stages), but the inverse case (current workflow has fewer stages than prior) isn't explicitly covered.

Security

No new attack surface. Metadata is JSON-validated on read and treated as data. The os.fsync + os.replace pattern is the right defense against torn writes from crashes mid-run.

Performance

  • One extra _count_parquet_records call per skippable stage (validation in _can_skip_prior_stage + the actual count in run). Cheap (parquet metadata, not row read), but doubling it is unnecessary — could cache the count or skip the validation since the count immediately follows.
  • _stage_result_from_metadata instantiates ArtifactStorage with resume=ResumeMode.ALWAYS, which triggers the resolved_dataset_name check. Fine for hits; on misses you'll get an ArtifactStorageError from the validator instead of going through DataDesignerWorkflowError. Consider catching and re-wrapping at the boundary so callers see a single error type per the project's "Errors normalize at boundaries" invariant.

Docs

  • docs/concepts/workflow-chaining.md and the Fern mirror are updated symmetrically — good.
  • Phase-3 plan update accurately describes the slice and remaining deferred items.
  • One precision nit: the ResumeMode.ALWAYS blurb should clarify the "first changed stage raises; downstream of a checkpoint-resumed stage runs fresh" behavior (or close the gap).

Verdict

Solid, well-tested addition. The core logic is correct and the new behavior is opt-in through resume=, so risk to existing callers is minimal. Address #1 (relative paths) before this is relied on for portable artifacts; #2 (ALWAYS doc/test) and #3 (narrow the except) are easy follow-ups. Everything else is taste.

Recommendation: approve with the path-portability and except Exception narrowing addressed (or filed as follow-ups). Other findings are non-blocking polish.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant